package app

import (
	"sync"

	"k8s.io/apimachinery/pkg/util/sets"

	"cvevulner/cve-ddd/domain"
)

const concurrentNum = 10

func NewConcurrency(c *coldPatchService, d domain.CollectedDataSlice, p, h sets.Set[string]) *concurrency {
	return &concurrency{
		data:             d,
		coldPatch:        c,
		packageSets:      p,
		handleBranchSets: h,
		taskChan:         make(chan domain.CollectedData, len(d)),
		filteredChan:     make(chan domain.CollectedData, len(d)),
	}
}

type concurrency struct {
	data             domain.CollectedDataSlice
	coldPatch        *coldPatchService
	packageSets      sets.Set[string]
	handleBranchSets sets.Set[string]

	wg           sync.WaitGroup
	taskChan     chan domain.CollectedData
	filteredChan chan domain.CollectedData
}

func (c *concurrency) handleFilterData() domain.CollectedDataSlice {
	c.sendTask()

	c.handleTask()

	c.waitJob()

	return c.receiveData()
}

func (c *concurrency) handleTask() {
	for i := 1; i <= concurrentNum; i++ {
		c.wg.Add(1)

		go func() {
			defer c.recovery()
			defer c.wg.Done()

			for v := range c.taskChan {
				skip, err := c.coldPatch.filterData(&v, c.handleBranchSets, c.packageSets)
				if err != nil {
					c.coldPatch.log.Errorf("filter data %s failed:%v", v.CveNum, err)
					continue
				}
				if skip {
					continue
				}

				c.filteredChan <- v
			}
		}()
	}
}

func (c *concurrency) sendTask() {
	for _, v := range c.data {
		c.taskChan <- v
	}

	close(c.taskChan)
}

func (c *concurrency) waitJob() {
	c.wg.Wait()
	close(c.filteredChan)
}

func (c *concurrency) receiveData() domain.CollectedDataSlice {
	var filteredIssueData []domain.CollectedData
	for v := range c.filteredChan {
		filteredIssueData = append(filteredIssueData, v)
	}

	return filteredIssueData
}

func (c *concurrency) recovery() {
	if r := recover(); r != nil {
		c.coldPatch.log.Errorf("handle filter data panic %v", r)
	}
}
